{
 "cells": [
  {
   "cell_type": "markdown",
   "id": "1b8a5f16-3d51-4690-80b3-95c7899c5474",
   "metadata": {},
   "source": [
    "# Using Ray for Scaling Up\n",
    "\n",
    "Daft's default Native runner is great for experimentation on your laptop, but when it comes times to running much more computationally expensive jobs that need to take advantage of large scale parallelism, you can run Daft on a [Ray](https://www.ray.io/) cluster instead.\n",
    "\n",
    "## What is a Ray Cluster, and why do I need it?\n",
    "\n",
    "Ray is a framework that exposes a Python interface for running distributed computation over a cluster of machines. Daft is built to use Ray as a backend for running dataframe operations, allowing it to scale to huge amounts of data and computation.\n",
    "\n",
    "However even if you do not have a big cluster to use Ray, you can run Ray locally on your laptop (in which case it would spin up a Ray cluster of just a single machine: your laptop), and using Daft's Ray backend would allow Daft to fully utilize your machine's cores.\n",
    "\n",
    "## Let's get started!\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "bbe8f7c1-ae08-49b9-96c7-4b16cf48f479",
   "metadata": {},
   "outputs": [],
   "source": [
    "!pip install daft[ray]\n",
    "!pip install Pillow"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "28d1d421",
   "metadata": {
    "tags": [
     "parameters"
    ]
   },
   "outputs": [],
   "source": [
    "CI = False"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "cb73a22b",
   "metadata": {
    "tags": []
   },
   "outputs": [],
   "source": [
    "import daft\n",
    "\n",
    "USE_RAY = False if CI else True\n",
    "NUM_ROWS_LIMIT = 16 if CI else 160\n",
    "IO_CONFIG = daft.io.IOConfig(\n",
    "    s3=daft.io.S3Config(anonymous=True, region_name=\"us-west-2\")\n",
    ")  # Use anonymous-mode for accessing AWS S3\n",
    "PARQUET_URL = \"s3://daft-public-data/tutorials/laion-parquet/train-00000-of-00001-6f24a7497df494ae.parquet\"\n",
    "\n",
    "daft.set_planning_config(default_io_config=IO_CONFIG)"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "8a3ef6f2-550b-4668-9d41-0417e98e22f7",
   "metadata": {},
   "source": [
    "By default, Daft uses the \"Python Runner\" which runs all processing in a single Python process.\n",
    "\n",
    "To activate the RayRunner, you can either:\n",
    "\n",
    "1. Use the `DAFT_RUNNER=ray` and optionally the `RAY_ADDRESS` environment variables\n",
    "2. Call `daft.context.set_runner_ray(...)` at the start of your program.\n",
    "\n",
    "We'll demonstrate option 2 here!"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "d73050cd",
   "metadata": {
    "tags": []
   },
   "outputs": [],
   "source": [
    "import daft\n",
    "\n",
    "if USE_RAY:\n",
    "    RAY_ADDRESS = None\n",
    "    daft.context.set_runner_ray(\n",
    "        # You may provide Daft with the address to an existing Ray cluster if you have one!\n",
    "        # If this is not provided, Daft will default to spinning up a single-node Ray cluster consisting of just your current local machine\n",
    "        address=RAY_ADDRESS,\n",
    "    )"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "5bb090a9",
   "metadata": {},
   "source": [
    "Let's try to download the images from our previous [Text-to-Image Generatation tutorial](https://colab.research.google.com/github/Eventual-Inc/Daft/blob/main/tutorials/text_to_image/text_to_image_generation.ipynb) with the RayRunner instead."
   ]
  },
  {
   "cell_type": "markdown",
   "id": "448057cd-52f7-449d-9c1c-6d6368de9cb2",
   "metadata": {},
   "source": [
    "We limit the dataset to 160 rows and repartition it into 8 partitions for demonstration purposes. This just means that our data will be divided into 8 approximately equal-sized \"chunks\"."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "f2639220-6c8f-48e2-9d8b-8301de21b8f2",
   "metadata": {
    "tags": []
   },
   "outputs": [],
   "source": [
    "from daft import col\n",
    "\n",
    "parquet_df = daft.read_parquet(PARQUET_URL, io_config=IO_CONFIG).limit(NUM_ROWS_LIMIT).repartition(8)\n",
    "parquet_df.collect()"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "a601555b-ab09-4c6a-87dd-77a761b351b3",
   "metadata": {},
   "source": [
    "## Download data from URLs\n",
    "\n",
    "Now, let's try downloading the data from the URLs with `.url.download()`!"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "01bf3fde-5a64-49e3-8f32-cbf180905efe",
   "metadata": {
    "tags": []
   },
   "outputs": [],
   "source": [
    "images_df = parquet_df.with_column(\"images\", col(\"URL\").url.download(on_error=\"null\"))\n",
    "images_df.collect()"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "d1ece805-b991-4b75-a3c0-ca00354365f2",
   "metadata": {},
   "source": [
    "On Google Colab, it should take approximately 10 seconds, vs about 20 seconds with the Py Runner!"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "c2278cc6-f3c6-44da-9f38-684e220a43e1",
   "metadata": {},
   "source": [
    "With exactly the same code, we were able to achieve a 2x speedup in execution - what happened here?\n",
    "\n",
    "It turns out that our workload is [IO Bound](https://en.wikipedia.org/wiki/I/O_bound) because most of the time is spent waiting for data to be downloaded from the URL.\n",
    "\n",
    "By default, the `.url.download()` UDF requests `num_cpus=1`. Since our Google Colab machine has 2 CPUs, the RayRunner is able to run two of these UDFs in parallel, hence achieving a 2x increase in throughput!"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "8430f756-f641-4bfa-9425-8d72e200d726",
   "metadata": {},
   "source": [
    "## Remote Ray Clusters\n",
    "\n",
    "We have seen that using the RayRunner even locally provides us with some speedup already. However, the real power of distributed computing is in allowing us to access thousands of CPUs and GPUs in the cloud, on a remote Ray cluster.\n",
    "\n",
    "For example, UDFs that request for a single GPU with can run in parallel across hundreds of GPUs on a remote Ray cluster, effortlessly scaling your workloads up to take full advantage of the available hardware.\n",
    "\n",
    "To run Daft on large clusters, check out [Eventual](https://www.eventualcomputing.com) where you have access to a fully managed platform for running Daft at scale."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "6f159a45-dbf5-4e19-b2c6-0d15474b270c",
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.9.18"
  },
  "vscode": {
   "interpreter": {
    "hash": "e5d77f7bd5a748e4f6412a25f9708ab7af36936de941fc795d1a6b75eb2da082"
   }
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
